Skip to content

Conversation

@mzient
Copy link
Contributor

@mzient mzient commented Feb 2, 2023

Category:

New feature (non-breaking change which adds functionality)

Description:

This PR introduces a new thread pool object that will eventually replace the current thread pool.
This thread pool separates the definition of work (called a Job) from the runner threads, allowing multiple concurrent jobs to share one pool of threads.
There are two types of job:
Job and IncrementalJob.
The former must be fully defined before it is submitted to the thread pool. The tasks in this kind of job have priorities.
The latter can be submitted in parts. This precludes sorting (once submitted to the thread pool, the tasks are processed in FIFO order).
The thread pool allows for some overlap between jobs, although when a job is submitted, all of its task will be picked up for execution before any tasks from any subsequent job.
After submitting a job, the job owner has to wait for the job to complete.

The new thread pool is reentrant - it is legal to submit and wait for a sub-job from inside a thread pool task. This scenario is detected automatically ant the Wait function cooperatively executes the tasks from the thread pool until the inner job completes.

Additional information:

The thread pool uses a plain std::queue for task storage and a counting semaphore (POSIX) and a mutex for synchronization.
The job uses an atomic counter of outstanding tasks and atomic_wait to wait for completion.

Affected modules and functionalities:

None. This is entirely new code that doesn't affect any existing functionality yet.

Key points relevant for the review:

Tests:

  • Existing tests apply
  • New tests added
    • Python tests
    • GTests
    • Benchmark
    • Other
  • N/A

Checklist

Documentation

  • Existing documentation applies
  • Documentation updated
    • Docstring
    • Doxygen
    • RST
    • Jupyter
    • Other
  • N/A

DALI team only

Requirements

  • Implements new requirements
  • Affects existing requirements
  • N/A

REQ IDs: N/A

JIRA TASK: DALI-864

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [7183171]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [7183171]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [7189340]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [7189340]: BUILD PASSED

@mzient mzient force-pushed the NewThreadPool branch 5 times, most recently from f0d76a6 to d74ec03 Compare February 9, 2023 12:49
@mzient mzient force-pushed the NewThreadPool branch 2 times, most recently from a181cde to 70deb50 Compare February 27, 2023 12:55
@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@mzient mzient force-pushed the NewThreadPool branch 2 times, most recently from 466d38d to e053ff7 Compare August 24, 2023 13:13
@mzient mzient force-pushed the NewThreadPool branch 3 times, most recently from 86c7337 to f2a437f Compare December 17, 2025 07:46
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41172120]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41172120]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41183936]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41184260]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41187088]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41187088]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41571534]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41571534]: BUILD FAILED

mzient and others added 16 commits January 13, 2026 09:29
Signed-off-by: Michał Zientkiewicz <[email protected]>
Signed-off-by: Michał Zientkiewicz <[email protected]>
Signed-off-by: Michał Zientkiewicz <[email protected]>
Signed-off-by: Michał Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
… a job object, reexposing the legacy thread pool interface.

Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michał Zientkiewicz <[email protected]>
Signed-off-by: Michał Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michał Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
@mzient mzient changed the title [WIP] New thread pool New thread pool Jan 14, 2026
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41726080]: BUILD STARTED

Signed-off-by: Michal Zientkiewicz <[email protected]>
@mzient mzient marked this pull request as ready for review January 14, 2026 13:30
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41727400]: BUILD STARTED

@greptile-apps
Copy link

greptile-apps bot commented Jan 14, 2026

Greptile Summary

This PR introduces a new thread pool implementation (ThreadPoolBase) with two job types: Job (priority-based) and IncrementalJob (FIFO). The implementation uses POSIX semaphores for worker thread synchronization and atomic counters with condition variables for job completion signaling.

Key implementation details:

  • Thread pool uses std::queue for task storage with semaphore-based synchronization
  • Jobs wrap tasks in error-catching lambdas that decrement num_pending_tasks_ atomically
  • Reentrant design allows submitting sub-jobs from within tasks via WaitOrRunTasks
  • TaskBulkAdd batches task additions and defers semaphore release until Submit()

The code is well-structured with comprehensive tests covering basic operations, error handling, reentrancy, and incremental job execution. The synchronization primitives are used correctly throughout.

Confidence Score: 4/5

  • This PR is safe to merge with minor considerations for edge cases
  • The implementation demonstrates solid concurrent programming practices with proper use of synchronization primitives. Comprehensive tests verify correctness including reentrancy scenarios. Score is 4 rather than 5 due to complexity of concurrent code and potential edge cases in shutdown sequences
  • Pay close attention to dali/core/exec/thread_pool_base.cc for the shutdown logic and WaitOrRunTasks reentrant execution flow

Important Files Changed

Filename Overview
dali/core/exec/thread_pool_base.cc Implementation of thread pool with job management, semaphore-based task scheduling, and reentrant wait support
include/dali/core/exec/thread_pool_base.h Header defining thread pool classes with priority-based Job and FIFO IncrementalJob, using atomic counters and condition variables
dali/core/exec/thread_pool_base_test.cc Comprehensive test suite covering basic operations, error handling, reentrancy, and incremental job execution

Sequence Diagram

sequenceDiagram
    participant Client
    participant Job
    participant ThreadPoolBase
    participant Semaphore
    participant WorkerThread
    participant Task

    Note over Client,Task: Normal Job Execution Flow
    Client->>Job: AddTask(runnable, priority)
    Job->>Job: Store in tasks_ multimap (sorted by priority)
    
    Client->>Job: Run(tp, wait=true)
    Job->>ThreadPoolBase: BeginBulkAdd()
    ThreadPoolBase-->>Job: TaskBulkAdd object
    
    loop For each task (by priority)
        Job->>ThreadPoolBase: batch.Add(task)
        ThreadPoolBase->>ThreadPoolBase: AddTaskNoLock() [mutex held]
        ThreadPoolBase->>ThreadPoolBase: tasks_.push()
    end
    
    Job->>ThreadPoolBase: batch.Submit() [destructor]
    ThreadPoolBase->>Semaphore: release(tasks_added)
    
    Semaphore-->>WorkerThread: wake up
    WorkerThread->>Semaphore: acquire()
    WorkerThread->>ThreadPoolBase: PopAndRunTask()
    ThreadPoolBase->>Task: execute()
    Task->>Job: decrement num_pending_tasks_
    
    alt Last task completed
        Task->>Job: DoNotify()
        Job->>Job: num_pending_tasks_.notify_all()
        Job->>Job: cv_.notify_all()
        Job->>Job: running_ = false
    end
    
    Job->>Job: Wait()
    
    alt Called from thread pool
        Job->>ThreadPoolBase: WaitOrRunTasks(cv_, condition)
        Note over ThreadPoolBase: Reentrant: execute tasks while waiting
        loop Until condition met
            ThreadPoolBase->>ThreadPoolBase: PopAndRunTask()
        end
    else Called from external thread
        Job->>Job: num_pending_tasks_.wait()
        Note over Job: Atomic wait on counter
    end
    
    Job-->>Client: Return (rethrow errors if any)
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41726080]: BUILD FAILED

Signed-off-by: Michal Zientkiewicz <[email protected]>
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [41730522]: BUILD STARTED

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants